0150ea9d34337ea80ee87e1e8cc081f65ae89f83,graylog2-shared/src/main/java/org/graylog2/shared/journal/KafkaJournal.java,LogRetentionCleaner,cleanupSegmentsToMaintainSize,#Log#,714

Before Change


                return 0;
            }
            final long[] diff = {kafkaLog.size() - retentionSize};
            return kafkaLog.deleteOldSegments(new AbstractFunction1<LogSegment, Object>() { // sigh scala
                @Override
                public Object apply(LogSegment segment) {
                    if (diff[0] - segment.size() >= 0) {
                        diff[0] -= segment.size();
                        loggerForCleaner.debug(
                                "[cleanup-size] Removing segment starting at offset {}, size {} bytes, to shrink log to new size {}, target size {}",
                                segment.baseOffset(),
                                segment.size(),
                                diff[0],
                                retentionSize);
                        return true;
                    } else {
                        return false;
                    }
                }
            });
        }

        private int cleanupSegmentsToRemoveCommitted(Log kafkaLog) {

After Change


                return 0;
            }
            final long[] diff = {currentSize - retentionSize};
            int deletedSegments = kafkaLog.deleteOldSegments(new AbstractFunction1<LogSegment, Object>() { // sigh scala
                @Override
                public Object apply(LogSegment segment) {
                    if (diff[0] - segment.size() >= 0) {
                        diff[0] -= segment.size();
                        loggerForCleaner.debug(
                                "[cleanup-size] Removing segment starting at offset {}, size {} bytes, to shrink log to new size {}, target size {}",
                                segment.baseOffset(),
                                segment.size(),
                                diff[0],
                                retentionSize);
                        return true;
                    } else {
                        return false;
                    }
                }
            });
            KafkaJournal.this.purgedSegmentsInLastRetention.set(deletedSegments);
            return deletedSegments;
        }

        private int cleanupSegmentsToRemoveCommitted(Log kafkaLog) {